In [1]:
import os
import cv2
import glob
import pickle
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
%matplotlib inline

Advanced Lane Finding Project


Camera calibration

In [2]:
def calibration(n=6, m=9):
    calibration_images = glob.glob("./camera_cal/*.jpg")
    objpoints = []  # 3D points in real world space
    imgpoints = []  # 2D points in image space
    #fig, axs = plt.subplots(5,4, figsize=(16, 11))
    #fig.subplots_adjust(hspace = .2, wspace=.001)
    #axs = axs.ravel()
    objp = np.zeros((n*m, 3), np.float32)
    objp[:,:2] = np.mgrid[0:m, 0:n].T.reshape(-1,2)
    #i = 0
    for cal_img in calibration_images:
        #i = i + 1
        img = mpimg.imread(cal_img)
        gray = cv2.cvtColor(img,cv2.COLOR_RGB2GRAY)
        ret, corners = cv2.findChessboardCorners(gray, (m,n), None)
        #img2 = cv2.drawChessboardCorners(img, (m,n), corners, ret)
        #axs[i].imshow(img2)
        if ret == True:
            imgpoints.append(corners)
            objpoints.append(objp)
    
    img_size = (gray.shape[1], gray.shape[0])
    ret, mtx, dist, rvecs, tvecs = cv2.calibrateCamera(objpoints, imgpoints, img_size, None, None)
    return mtx, dist

def undistort(img, mtx, dist):
    dst = cv2.undistort(img, mtx, dist, None, mtx)
    return dst
In [ ]:
#mtx, dist = calibration()
In [ ]:
#calibration = {}
#calibration["mtx"] = mtx
#calibration["dist"] = dist
#pickle.dump( calibration, open( "calibration.p", "wb" ) )
In [3]:
calibration = pickle.load(open( "calibration.p", "rb" ))
In [4]:
mtx = calibration["mtx"]
dist = calibration["dist"]
In [ ]:
#img = cv2.drawChessboardCorners(img, (m,n), corners, ret)
In [5]:
img = mpimg.imread("./test_images/test6.jpg")
In [6]:
dst = undistort(img, mtx, dist)
In [7]:
def plot(img, converted, title1='Original', title2='Converted', cmap="gray", flag1=False, flag2=False):
    f, (ax1, ax2) = plt.subplots(1, 2, figsize=(20,10))
    f.subplots_adjust(hspace = .2, wspace=.05)
    ax1.imshow(img)
    ax1.set_title(title1, fontsize=30)
    ax2.imshow(converted, cmap=cmap)
    ax2.set_title(title2, fontsize=30)
    
    if flag1 == True:
        mpimg.imsave("output_images/" + title1, img, format="jpg")
    if flag2 == True:
        mpimg.imsave("output_images/" + title2, converted, format="jpg")
In [8]:
plot(img, dst, 'Original', 'Undistorted', flag1=True, flag2=True)
In [10]:
def perspective_matrices(img):
    n, m = img.shape[:-1]
    # top-left top-right bottom-right bottom-left
    src = np.array([ [550, 450], [750,450], [1200, 700], [100, 700] ], dtype='float32')
    ofs = 0 # offset for dst points
    dst = np.array([ [ofs, ofs], [m, ofs], [m, n], [ofs, n] ], dtype='float32')
    M = cv2.getPerspectiveTransform(src, dst)
    Minv = cv2.getPerspectiveTransform(dst, src)
    return M, Minv
In [11]:
M, Minv = perspective_matrices(img)
In [12]:
def perspective(img, M):
    n, m = img.shape[:-1]
    warped = cv2.warpPerspective(img, M, (m,n), flags=cv2.INTER_LINEAR)
    return warped
In [13]:
warped = perspective(dst, M)
In [14]:
dst_copy = dst.copy()
In [15]:
top_left = (550, 450)
top_right = (750,450)
bottom_right = (1200, 700)
bottom_left = (100, 700)
In [16]:
# bottom
dst_copy = cv2.line(dst_copy, bottom_left, bottom_right, color=[255,0,0], thickness=6)
# top
dst_copy = cv2.line(dst_copy, top_left, top_right, color=[255,0,0], thickness=6)
# right
dst_copy = cv2.line(dst_copy, top_right, bottom_right, color=[255,0,0], thickness=6)
# left
dst_copy = cv2.line(dst_copy, bottom_left, top_left, color=[255,0,0], thickness=6)
In [17]:
plot(dst_copy, warped, 'Undistorted', 'Warped', flag1=True, flag2=True)
In [18]:
plot(dst, warped, 'Undistort', 'Warped')

gradient & color thresholding

In [19]:
def region(img, vertices):
    
    mask = np.zeros_like(img)   
    mask_color = 255
    cv2.fillPoly(mask, vertices, mask_color)
    masked_image = cv2.bitwise_and(img, mask)
    return masked_image
In [20]:
def grayscale(img):
    return cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
    
def hsvscale(img):
    hsv = cv2.cvtColor(img, cv2.COLOR_RGB2HSV)
    return hsv

def hlsscale(img):
    hls = cv2.cvtColor(img, cv2.COLOR_RGB2HLS)
    return hls

def gaussian_blur(img, kernel_size):
    """Applies a Gaussian Noise kernel"""
    return cv2.GaussianBlur(img, (kernel_size, kernel_size), 0)

def sobel_thresh(img, sobel_kernel=5, flag="x", thresh_min=0, thresh_max=255):
    gray = grayscale(img)    
    if flag == "x":
        sobelx = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=sobel_kernel)
        sobel = np.absolute(sobelx)
    elif flag == "y":
        sobely = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=sobel_kernel)
        sobel = np.absolute(sobely)
    ## magnitude
    elif flag == "m":
        sobelx = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=sobel_kernel)
        sobely = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=sobel_kernel)
        sobel = np.sqrt(sobelx**2 + sobely**2)
    ## direction
    elif flag == "d":
        sobelx = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=sobel_kernel)
        sobely = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=sobel_kernel)
        abs_sobelx = np.absolute(sobelx)
        abs_sobely = np.absolute(sobely)
        direction = np.arctan2(abs_sobely, abs_sobelx)
    
    if flag != "d":
        scaled_sobel = np.uint8(255*sobel/np.max(sobel))
    else:
        scaled_sobel = direction
        
    mask = np.zeros_like(scaled_sobel)
    mask[(scaled_sobel > thresh_min) & (scaled_sobel <= thresh_max)] = 1
    return mask

def color_thresh(img, flag="s", thresh_min=0, thresh_max=255):
    
    hls = hlsscale(img)
    if flag == "s":
        channel = hls[:,:,2]
    elif flag == "l":
        channel = hls[:,:,1]
    elif flag == "h":
        channel = hls[:,:,0]
    elif flag == "g":
        channel = grayscale(img)
    
    scaled_channel = channel[:]#np.uint8(255*channel/np.max(channel))
    mask = np.zeros_like(scaled_channel)
    mask[(scaled_channel > thresh_min) & (scaled_channel <= thresh_max)] = 1
    return mask
In [56]:
gauss = gaussian_blur(warped, 5)
In [57]:
thresh = sobel_thresh(gauss, flag="x", thresh_min=25, thresh_max=255)
In [23]:
plot(warped, thresh, 'Warped', 'x-Sobel', flag2=True)
In [58]:
color = color_thresh(gauss, "s", 150, 200)
In [27]:
plot(warped, color, 'Warped', 'S-Channel', flag2=True)
In [59]:
combined = np.zeros_like(color)
combined[(color == 1) | (thresh == 1)] = 1
In [29]:
plot(warped, combined, 'Warped', 'Combined', flag2=True)

build pipeline

In [30]:
def starting_centers(image):
    n, m = image.shape[:2]
    ofs = m // 2
    
    left_center = None
    right_center = None
    
    hist = np.sum(image[n//2:], axis=0)
    t = hist.shape[0]//4
    
    left_hist = hist[:t]
    if len(left_hist) != 0:
        left_center = left_hist.argmax()
    
    right_hist = hist[(ofs + t):]
    if len(right_hist) != 0:
        right_center = right_hist.argmax() + (ofs + t)
    
    return (left_center, right_center)
In [31]:
left_center, right_center = starting_centers(combined)

approx position left line y=720 in pix

In [32]:
left_center
Out[32]:
285

approx position right line y=720 in pix

In [33]:
right_center
Out[33]:
1163
In [34]:
def window_routine(image, center, height_win, t, k=100, flag=False):
    
    bottom_left = ((-k + center), (height_win*t))
    bottom_right = ((center + k), (height_win*t))
    upper_left = ((-k + center), (height_win)*(t+1))
    upper_right = ((center + k), (height_win)*(t+1))
    
    if flag == True:
        cv2.rectangle(gauss,bottom_left,upper_right,(0,0,255), 6) 
    
    win = image[(height_win*t):((height_win)*(t+1)), (-k + center):(center + k)]
    return win
In [35]:
def windows_compute(win, line, old_center, height_win, t, k=100, flag=False):
    
    line_w = win.nonzero()
    hist = np.sum(win, axis=0)
    
    if len(hist) == 0:
        return old_center, line
    
    center = hist.argmax()
    minpix = 50
    if len(line_w[0]) > minpix:
        new_center = old_center  + (center - k)
    else:
        new_center = old_center
    line[0].extend(line_w[1] + (-k + old_center))
    line[1].extend(line_w[0] + (height_win)*(t))
    
    if flag == True:
        cv2.line(gauss, (new_center, (height_win)*(t)), (new_center, (height_win)*(t+1)), (255,0,0), 6)
    return new_center, line
In [36]:
def detect_lines(image, left_center_old, right_center_old):
    
    n, m = image.shape[:2]
    n_windows = 8
    height_win = n // n_windows
    
    line_left = [[], []]
    line_right = [[], []]
    radius_left = []
    radius_right = []
    xl = []
    yl = []
    xr = []
    yr = []
    
    left_center, right_center = starting_centers(image)
    
    if left_center == None and right_center == None:
        xl, yl, xr, yr, radius_left, radius_right, left_center, right_center
    
    if left_center == None:
        left_center = left_center_old
    
    left_center_start = left_center
        
    if right_center == None:
        right_center = right_center_old
        
    right_center_start = right_center
    
    for t in range((n_windows-1),-1, -1):
        win_left = window_routine(image, left_center, height_win, t, flag=True)
        win_right = window_routine(image, right_center, height_win, t, flag=True)
        
        left_center, line_left = windows_compute(win_left, line_left, left_center, height_win, t, flag=True)
        right_center, line_right = windows_compute(win_right, line_right, right_center, height_win, t, flag=True)
    
    left_x = np.array(line_left[0], dtype="int32")
    left_y = np.array(line_left[1], dtype="int32")
    yl = np.linspace(0,n)
    
    ym_per_pix = 30/720 # meters per pixel
    xm_per_pix = 3.7/700
    
    if left_x is not []:
    
        left_fit = np.polyfit(left_y, left_x, 2)
        xl = left_fit[2] + left_fit[1]*yl + left_fit[0]*yl**2
        
        left_fit_radius = np.polyfit( (left_y * ym_per_pix), (left_x * xm_per_pix), 2)
        radius_left = (1 + (2 * left_fit_radius[0] * (yl * ym_per_pix) + left_fit_radius[1])**2) * (3/2) / (2 * left_fit_radius[0])
        
        xl = xl.astype("int32")
        yl = yl.astype("int32")
        
    right_x = np.array(line_right[0], dtype="int32")
    right_y = np.array(line_right[1], dtype="int32")
    yr = np.linspace(0,n)
    
    if right_x is not []:
        
        right_fit = np.polyfit(right_y, right_x, 2)
        xr = right_fit[2] + right_fit[1]*yr + right_fit[0]*yr**2
        
        right_fit_radius = np.polyfit( (right_y * ym_per_pix), (right_x * xm_per_pix), 2)
        radius_right = (1 + (2 * right_fit_radius[0] * (yr * ym_per_pix) + right_fit_radius[1])**2)*(3/2) / (2 * right_fit_radius[0]) 
        
        xr = xr.astype("int32")
        yr = yr.astype("int32")
    
    return xl, yl, xr, yr, radius_left, radius_right, left_center_start, right_center_start
In [60]:
xl, yl, xr, yr, radius_left, radius_right, left_center_start, right_center_start = detect_lines(combined, 
                                                                                                285, 1150)
In [39]:
plt.imshow(gauss, cmap="gray")
mpimg.imsave("output_images/" + "lines_win", gauss, format="jpg")
plt.show()
In [68]:
plt.figure(figsize=(20, 20))
plt.imshow(gauss, cmap="gray")
plt.plot(xl, yl, color="green", linewidth=4)
plt.plot(xr, yr, color="green", linewidth=4)
plt.ylim(720,0)
plt.savefig("output_images/lines_win_fit.jpg", bbox_inches='tight', dpi=200)
In [41]:
def pipeline(img, left_center_old=285, right_center_old=1150, flag=False):
    
    dst = undistort(img, mtx, dist)
    M, Minv = perspective_matrices(img)
    warped = perspective(dst, M)
    gauss = gaussian_blur(warped, 5)
    thresh = sobel_thresh(gauss, flag="x", thresh_min=25, thresh_max=255)
    color = color_thresh(gauss, "s",  thresh_min=150,  thresh_max=200)
    combined = np.zeros_like(color)
    combined[(color == 1) | (thresh == 1)] = 1
    xl, yl, xr, yr, radius_left, radius_right, left_center_start, right_center_start = detect_lines(combined,
                                                                                                    left_center_old, 
                                                                                                    right_center_old)
    
    if flag==True:
        return gauss, xl, yl, xr, yr, radius_left, radius_right, left_center_start, right_center_start, Minv
    else:
        return xl, yl, xr, yr, radius_left, radius_right, left_center_start, right_center_start, Minv
In [42]:
xl, yl, xr, yr, radius_left, radius_right, left_center_start, right_center_start, Minv = pipeline(img)

radius curvature left (m)

In [43]:
radius_left[len(radius_left)//2]
Out[43]:
979.72478223616974

radius curvature right (m)

In [44]:
radius_right[len(radius_right)//2]
Out[44]:
1056.8682211491766

position respect center [- left, + right] (m)

In [45]:
ofs = img.shape[1] // 2
In [46]:
position = (right_center_start + left_center_start)/2 - ofs
In [47]:
xm_per_pix = 3.7/700
In [48]:
position * xm_per_pix
Out[48]:
0.44400000000000001
In [49]:
def draw(img, xl, yl, xr, yr, Minv):
    new_img = np.copy(img)
    
    color_warp = np.zeros_like(new_img).astype(np.uint8)
    h, w = warped.shape[:2]
    
    if xl is not []:
        pts_left = np.array([np.transpose(np.vstack([xl, yl]))])
        cv2.polylines(color_warp, np.int32([pts_left]), isClosed=False, color=(0,255,255), thickness=15)
        
    if xr is not []:
        pts_right = np.array([np.flipud(np.transpose(np.vstack([xr, yr])))])
        cv2.polylines(color_warp, np.int32([pts_right]), isClosed=False, color=(255,0,255), thickness=15)
    
    if xl is not [] and xr is not []:
        pts = np.hstack((pts_left, pts_right))
        cv2.fillPoly(color_warp, np.int_([pts]), (0,255, 0))
    
    newwarp = cv2.warpPerspective(color_warp, Minv, (w, h))
    
#     if flag == False:
#         font = cv2.FONT_HERSHEY_DUPLEX
#         text = 'right curvature: ' + radius + 'm'
#         cv2.putText(new_img, text, (40,70), font, 1.5, (200,255,155), 2, cv2.LINE_AA)
#         text = 'distance center :' + position + 'm'  
#         cv2.putText(new_img, text, (40,120), font, 1.5, (200,255,155), 2, cv2.LINE_AA)
    
    result = cv2.addWeighted(new_img, 1, newwarp, 0.7, 0)
    return result
In [50]:
files = os.listdir("./test_images/")
In [51]:
fig, axs = plt.subplots(len(files), 3, figsize=(20,20))
fig.subplots_adjust(hspace = .2, wspace=.05)

for f in range(len(files)):
    file = files[f]
    image = mpimg.imread("./test_images/" + file)
    warped, xl, yl, xr, yr, radius_left, radius_right, left_center_start, right_center_start, Minv = pipeline(image, flag=True)
    result = draw(image, xl, yl, xr, yr, Minv)
    
    axs[f, 0].imshow(image)
    axs[f, 1].imshow(warped, cmap="gray")
    axs[f, 1].plot(xl, yl, color="blue", linewidth=4)
    axs[f, 1].plot(xr, yr, color="red", linewidth=4)
    axs[f, 2].imshow(result)
    mpimg.imsave("output_images/" + file, result, format="jpg")
In [52]:
# Define a class to receive the characteristics of each line detection
class Line():
    def __init__(self):
        # was the line detected in the last iteration?
        #self.detected = False  
        # x values of the last n fits of the line
        self.stack_x = [] 
        #average x values of the fitted line over the last n iterations
        self.bestx = None     
        #polynomial coefficients averaged over the last n iterations
        #self.best_fit = None  
        #polynomial coefficients for the most recent fit
        #self.current_fit = [np.array([False])]  
        #radius of curvature of the line in some units
        self.stack_radius_of_curvature = [] 
        self.radius_of_curvature = None 
        #distance in meters of vehicle center from the line
        self.line_base_pos = None 
        #difference in fit coefficients between last and new fits
        #self.diffs = np.array([0,0,0], dtype='float') 
        #x values for detected line pixels
        #self.allx = None  
        #y values for detected line pixels
        #self.ally = None
        
    def update(self, x, y, radius, center, ofs):
        if x is not []:
            if len(self.stack_x) > 10:
                self.stack_x.pop(0)
                self.stack_radius_of_curvature.pop(0)
            self.stack_x.append(x)
            self.best_x = np.mean(self.stack_x, axis=0)
            self.stack_radius_of_curvature.append(radius[len(radius)//2])
            self.radius_of_curvature = np.mean(self.stack_radius_of_curvature, axis=0)
            self.line_base_pos = abs(center -ofs)
In [53]:
left_line = Line()
right_line = Line()

def process_image(image):
    new_image = image.copy()
    h, w = new_image.shape[:2]
    ofs = w // 2
    
    xl, yl, xr, yr, radius_left, radius_right, left_center_start, right_center_start, Minv = pipeline(new_image)
    
    left_line.update(xl, yl, radius_left, left_center_start, ofs)
    right_line.update(xr, yr, radius_right, right_center_start, ofs)
    
    averaged_xl = left_line.best_x
    averaged_xr = right_line.best_x
    averaged_radius_l = left_line.radius_of_curvature
    averaged_radius_r = right_line.radius_of_curvature
    
    position = ofs - (right_line.line_base_pos + left_line.line_base_pos)/2
    
    result = draw(new_image, averaged_xl, yl, averaged_xr, yr, Minv)
    return result
In [54]:
from moviepy.editor import VideoFileClip
from IPython.display import HTML
In [70]:
video_output1 = 'project_video_output.mp4'
video_input1 = VideoFileClip('./videos/project_video.mp4')#.subclip(22,26)
processed_video = video_input1.fl_image(process_image)
%time processed_video.write_videofile(video_output1, audio=False)
[MoviePy] >>>> Building video project_video_output.mp4
[MoviePy] Writing video project_video_output.mp4
100%|█████████▉| 1260/1261 [04:02<00:00,  4.70it/s]
[MoviePy] Done.
[MoviePy] >>>> Video ready: project_video_output.mp4 

CPU times: user 5min 49s, sys: 4.78 s, total: 5min 54s
Wall time: 4min 6s
In [71]:
HTML("""
<video width="960" height="540" controls>
  <source src="{0}">
</video>
""".format(video_output1))
Out[71]:
In [ ]:
# video_output2 = 'challenge_video_output.mp4'
# video_input2 = VideoFileClip('challenge_video.mp4')#.subclip(22,26)
# processed_video = video_input2.fl_image(process_image)
# %time processed_video.write_videofile(video_output2, audio=False)
In [ ]:
# HTML("""
# <video width="960" height="540" controls>
#   <source src="{0}">
# </video>
# """.format(video_output2))
In [ ]:
# video_output2 = 'project_video_output_challenge.mp4'
# video_input2 = VideoFileClip('harder_challenge_video.mp4')
# processed_video = video_input2.fl_image(process_image)
# %time processed_video.write_videofile(video_output2, audio=False)